package com.elastic.search.listener;

import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.elastic.search.event.DeleteCanalEvent;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

/**
 * 删除
 * @author huangdeyao
 */
@Component
public class DeleteCanalListener extends AbstractCanalListener<DeleteCanalEvent> {
    private static final Logger logger = LoggerFactory.getLogger(DeleteCanalListener.class);
    @Autowired
    RestHighLevelClient restHighLevelClient;

    @Override
    protected void doSync(String database, String table, String index, RowData rowData) {
        logger.info("==============================delete===============================================");
        logger.info("database:" + database + ",table:" + table + ",index:" + index + ",rowData:" + rowData);
        Map<String, Object> jsonMap = parseColumnsToMap(rowData.getBeforeColumnsList());
        if (jsonMap.get(ES_ID) != null) {
            DeleteRequest deleteRequest = new DeleteRequest(index, jsonMap.get(ES_ID).toString());
            deleteRequest.timeout("1s");
            try {
                DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
                logger.info("id==>" + jsonMap.get(ES_ID) + "======== 实时同步数据 delete 结果 ======>" + deleteResponse.status());
                logger.info(deleteResponse.toString());
            } catch (IOException e) {
                logger.info("=========实时同步数据 delete异常======");
                e.printStackTrace();
            }
        } else {
            logger.info("=========实时同步数据 delete 没有表id字段======");
        }
    }
}
